--- title: Data Transforms for Hyperspectral Datacubes keywords: fastai sidebar: home_sidebar summary: "Hyperspectral datacubes contain three axes (cross-track, along-track, and wavelength) and are stored in 3D numpy ndarrays. When using a pushbroom scanner, the datacube are filled gradually. The implementation here is based on a circular buffer and we provide additional methods to apply a pipeline of transforms which, for example, can be used for smile correction, and radiance conversion. " description: "Hyperspectral datacubes contain three axes (cross-track, along-track, and wavelength) and are stored in 3D numpy ndarrays. When using a pushbroom scanner, the datacube are filled gradually. The implementation here is based on a circular buffer and we provide additional methods to apply a pipeline of transforms which, for example, can be used for smile correction, and radiance conversion. " nb_path: "nbs/00_data.ipynb" ---
{% include tip.html content='This module can be imported using from openhsi.data import *' %}
For example, we can write to a 1D array
cib = CircArrayBuffer(size=(7,),axis=0)
for i in range(9):
cib.put(i)
cib.show()
for i in range(9):
print(i,cib.get())
Or a 2D array
import holoviews as hv
hv.extension("bokeh",logo=False)
plots_list = []
cib = CircArrayBuffer(size=(4,4),axis=0)
cib.put(1) # scalars are broadcasted to a 1D array
for i in range(5):
cib.put(cib.get()+1)
plots_list.append( cib.show().opts(colorbar=True,title=f"i={i}") )
hv.Layout(plots_list).cols(3)
The OpenHSI camera has a settings dictionary which contains these fields:
camera_id is your camera name,row_slice indicates which rows are illuminated and we crop out the rest,resolution is the full pixel resolution given by the camera without cropping, andfwhm_nm specifies the size of spectral bands in nanometers,exposure_ms is the camera exposure time last used,luminance is the reference luminance to convert digital numbers to luminance,longitude is the longitude degrees east,latitude is the latitude degrees north,datetime_str is the UTC time at time of data collection,altitude is the altitude above sea level (assuming target is at sea level) measured in km,radiosonde_station_num is the station number from http://weather.uwyo.edu/upperair/sounding.html,radiosonde_region is the region code from http://weather.uwyo.edu/upperair/sounding.html, andsixs_path is the path to the 6SV executable.binxy number of pixels to bin in (x,y) directionwin_offset offsets (x,y) from edge of detector for a selective readout window (used in combination with a win_resolution less than full detector size).win_resolution size of area on detector to readout (width, height)pixel_format format of pixels readout sensor, ie 8bit, 10bit, 12bitThe settings dictionary may also contain additional camera specific fields:
mac_addr is GigE camera mac address - used by Lucid Vision Sensorsserial_num serial number of detector - used by Ximea and FLIR SensorsThe pickle file is a dictionary with these fields:
camera_id is your camera name,HgAr_pic is a picture of a mercury argon lamp's spectral lines for wavelength calibration,flat_field_pic is a picture of a well lit for calculating the illuminated area,smile_shifts is an array of pixel shifts needed to correct for smile error,wavelengths_linear is an array of wavelengths after linear interpolation,wavelengths is an array of wavelengths after cubic interpolation,rad_ref is a 4D datacube with coordinates of cross-track, wavelength, exposure, and luminance,sfit is the spline fit function from the integrating sphere calibration, andrad_fit is the interpolated function of the expected radiance at sensor computed using 6SV.These files are unique to each OpenHSI camera.
For example, the contents of CameraProperties consists of two dictionaries. To produce the files cam_settings.json and cam_calibration.pkl, follow the steps outlined in the calibration module.
cam_prop = CameraProperties(pkl_path="assets/cam_calibration.pkl")
cam_prop
cam_prop.calibration["rad_ref"]
We can apply a number of transforms to the camera's raw data and these tranforms are used to modify the processing level during data collection. For example, we can perform a fast smile correction and wavelength binning during operation. With more processing, this is easily extended to obtain radiance and reflectance.
Some transforms require some setup which is done using CameraProperties.tfm_setup. This method also allows one to tack on an additional setup function with the argument more_setup which takes in any callable which can mutate the CameraProperties class.
You can add your own tranform by monkey patching the CameraProperties class.
@patch
def identity(self:CameraProperties, x:np.ndarray) -> np.ndarray:
"""The identity tranform"""
return x
If you don't require any camera settings or calibration files, a valid transform can be any Callable that takens in a 2D np.ndarray and returns a 2D np.ndarray.
Depending on the level of processing that one wants to do real-time, a number of transforms need to be composed in sequential order. To make this easy to customise, you can use the pipeline method and pass in a raw camera frame and an ordered list of transforms.
To make the transforms pipeline easy to use and customise, you can use the CameraProperties.set_processing_lvl method.
# if wavelength calibration is changed, this needs to be updated
cam_prop = CameraProperties("assets/cam_settings.json","assets/cam_calibration.pkl")
cam_prop.set_processing_lvl(-1) # raw digital numbers
test_eq( (924,1240), np.shape( cam_prop.pipeline(cam_prop.calibration["HgAr_pic"])) )
cam_prop.set_processing_lvl(0) # cropped
test_eq( (905, 1240), np.shape( cam_prop.pipeline(cam_prop.calibration["HgAr_pic"])) )
cam_prop.set_processing_lvl(1) # fast smile corrected
test_eq( (905, 1231), np.shape( cam_prop.pipeline(cam_prop.calibration["HgAr_pic"])) )
cam_prop.set_processing_lvl(2) # fast binned
test_eq( (905, 136), np.shape( cam_prop.pipeline(cam_prop.calibration["HgAr_pic"])) )
cam_prop.set_processing_lvl(4) # radiance
test_eq( (905, 136), np.shape( cam_prop.pipeline(cam_prop.calibration["HgAr_pic"])) )
# cam_prop.set_processing_lvl(6) # reflectance
# test_eq( (452,108), np.shape( cam_prop.pipeline(cam_prop.calibration["HgAr_pic"])) )
# cam_prop.set_processing_lvl(5) # radiance conversion moved earlier in pipeline
# test_eq( (452,108), np.shape( cam_prop.pipeline(cam_prop.calibration["HgAr_pic"])) )
cam_prop = CameraProperties("assets/cam_settings.json","assets/cam_calibration.pkl")
cam_prop.set_processing_lvl(3) # slow binned
test_eq( (905, 118), np.shape( cam_prop.pipeline(cam_prop.calibration["HgAr_pic"])) )
DataCube takes a line with coordinates of wavelength (x-axis) against cross-track (y-axis), and stores the smile corrected version in its CircArrayBuffer.
For facilitate near real-timing processing, a fast smile correction procedure is used. An option to use a fast binning procedure is also available. When using these two procedures, the overhead is roughly 2 ms on a Jetson board.
Instead of preallocating another buffer for another data collect, one can use the circular nature of the DataCube and use the internal buffer again without modification - just use DataCube.put like normal.
All data buffers are preallocated so it's no secret that hyperspectral datacubes are memory hungry. For reference:
| along-track pixels | wavelength binning | RAM needed | time to collect at 10 ms exposure | time to save to SSD |
|---|---|---|---|---|
| 4096 | 4 nm | ≈ 800 MB | ≈ 55 s | ≈ 3 s |
| 1024 | no binning | ≈ 4 GB | ≈ 14 s | ≈ 15 s |
In reality, it is very difficult to work with raw data without binning due to massive RAM usage and extended times to save the NetCDF file to disk which hinders making real-time analysis. The frame rate (at 10 s exposure) with binning drops the frame rate to from 90 fps to 75 fps. In our experimentation, using a SSD mounted into a M.2 slot on a Jetson board provided the fastest experience. When using other development boards such as a Raspberry Pi 4, the USB 3.0 port is recommended over the USB 2.0 port.
timestamps = DateTimeBuffer(n=8)
for i in range(8):
timestamps.update()
timestamps.data
load_nc expects the datacube to have coordinates (wavelength, cross-track, along-track). This is a format that can be viewed in ENVI and QGIS. If you have a datacube with coordinates (cross-track, along-track, wavelength), then set the parameter old_style=True.
The plot_lib argument hs
Choose matplotlib if you want to use
Choose bokeh if you want to compose plots together and use interactive tools.
@patch
def put(self:DataCube, x:np.ndarray):
"""Applies the composed tranforms and writes the 2D array into the data cube. Stores a timestamp for each push."""
self.timestamps.update()
self.dc.put( self.pipeline(x) )
@patch
def save(self:DataCube,
save_dir:str, # Path to folder where all datacubes will be saved at
preconfig_meta_path:str=None, # Path to a .json file that includes metadata fields to be saved inside datacube
prefix:str="", # Prepend a custom prefix to your file name
suffix:str="", # Append a custom suffix to your file name
):
"""Saves to a NetCDF file (and RGB representation) to directory dir_path in folder given by date with file name given by UTC time."""
if preconfig_meta_path is not None:
with open(preconfig_meta_path) as json_file:
attrs = json.load(json_file)
else: attrs = {}
if hasattr(self, "ds_metadata"): attrs = self.ds_metadata
self.directory = Path(f"{save_dir}/{self.timestamps[0].strftime('%Y_%m_%d')}/").mkdir(parents=True, exist_ok=True)
self.directory = f"{save_dir}/{self.timestamps[0].strftime('%Y_%m_%d')}"
if hasattr(self, "binned_wavelengths"):
wavelengths = self.binned_wavelengths if self.proc_lvl not in (3,7,8) else self.λs
else:
wavelengths = np.arange(self.dc.data.shape[2])
if hasattr(self,"cam_temperatures"):
self.coords = dict(wavelength=(["wavelength"],wavelengths),
x=(["x"],np.arange(self.dc.data.shape[0])),
y=(["y"],np.arange(self.dc.data.shape[1])),
time=(["time"],self.timestamps.data.astype(np.datetime64)),
temperature=(["temperature"],self.cam_temperatures.data))
else:
self.coords = dict(wavelength=(["wavelength"],wavelengths),
x=(["x"],np.arange(self.dc.data.shape[0])),
y=(["y"],np.arange(self.dc.data.shape[1])),
time=(["time"],self.timestamps.data.astype(np.datetime64)))
# time coordinates can only be saved in np.datetime64 format
self.nc = xr.Dataset(data_vars=dict(datacube=(["wavelength","x","y"],np.moveaxis(self.dc.data, -1, 0) )),
coords=self.coords, attrs=attrs)
"""provide metadata to NetCDF coordinates"""
self.nc.x.attrs["long_name"] = "cross-track"
self.nc.x.attrs["units"] = "pixels"
self.nc.x.attrs["description"] = "cross-track spatial coordinates"
self.nc.y.attrs["long_name"] = "along-track"
self.nc.y.attrs["units"] = "pixels"
self.nc.y.attrs["description"] = "along-track spatial coordinates"
self.nc.time.attrs["long_name"] = "along-track"
self.nc.time.attrs["description"] = "along-track spatial coordinates"
self.nc.wavelength.attrs["long_name"] = "wavelength_nm"
self.nc.wavelength.attrs["units"] = "nanometers"
self.nc.wavelength.attrs["description"] = "wavelength in nanometers."
if hasattr(self,"cam_temperatures"):
self.nc.temperature.attrs["long_name"] = "camera temperature"
self.nc.temperature.attrs["units"] = "degrees Celsius"
self.nc.temperature.attrs["description"] = "temperature of sensor at time of image capture"
self.nc.datacube.attrs["long_name"] = "hyperspectral datacube"
self.nc.datacube.attrs["units"] = "digital number"
if self.proc_lvl in (4,5,7): self.nc.datacube.attrs["units"] = "uW/cm^2/sr/nm"
elif self.proc_lvl in (6,8): self.nc.datacube.attrs["units"] = "percentage reflectance"
self.nc.datacube.attrs["description"] = "hyperspectral datacube"
self.nc.to_netcdf(f"{self.directory}/{prefix}{self.timestamps[0].strftime('%Y_%m_%d-%H_%M_%S')}{suffix}.nc")
fig = self.show("matplotlib",hist_eq=True,quick_imshow=True)
fig.savefig(f"{self.directory}/{prefix}{self.timestamps[0].strftime('%Y_%m_%d-%H_%M_%S')}{suffix}.png",
bbox_inches='tight', pad_inches=0)
@patch
def load_nc(self:DataCube,
nc_path:str, # Path to a NetCDF4 file
old_style:bool = False, # Only for backwards compatibility for datacubes created before first release
):
"""Lazy load a NetCDF datacube into the DataCube buffer."""
with xr.open_dataset(nc_path) as ds:
if old_style: # cross-track, along-track, wavelength
self.dc = CircArrayBuffer(size=ds.datacube.shape, axis=1, dtype=type(np.array(ds.datacube[0,0])[0]))
self.dc.data = np.array(ds.datacube)
else: # wavelength, cross-track, along-track -> convert to old_style (datacube inserts do not need transpose)
shape = (*ds.datacube.shape[1:],ds.datacube.shape[0])
self.dc = CircArrayBuffer(size=shape, axis=1, dtype=type(np.array(ds.datacube[0,0])[0]))
self.dc.data = np.moveaxis(np.array(ds.datacube), 0, -1)
print(f"Allocated {4*reduce(lambda x,y: x*y, ds.datacube.shape)/2**20:.02f} MB of RAM.")
self.ds_timestamps = ds.time.to_numpy() # type is np.datetime64. convert to datetime.datetime
unix_epoch = np.datetime64(0, 's')
one_second = np.timedelta64(1, 's')
seconds_since_epoch = (self.ds_timestamps - unix_epoch) / one_second
self.ds_timestamps = np.array([datetime.utcfromtimestamp(s) for s in seconds_since_epoch])
self.timestamps.data = self.ds_timestamps
self.ds_metadata = ds.attrs
if hasattr(ds,"temperature"):
self.ds_temperatures = ds.temperature.to_numpy()
self.cam_temperatures = CircArrayBuffer(size=self.ds_temperatures.shape,dtype=np.float32)
self.cam_temperatures.data = self.ds_temperatures
self.binned_wavelengths = np.array(ds.wavelength)
self.dc.slots_left = 0 # indicate that the data buffer is full
@patch
def show(self:DataCube,
plot_lib:str = "bokeh", # Plotting backend. This can be 'bokeh' or 'matplotlib'
red_nm:float = 640., # Wavelength in nm to use as the red
green_nm:float = 550., # Wavelength in nm to use as the green
blue_nm:float = 470., # Wavelength in nm to use as the blue
robust:bool = False, # Choose to plot using the 2-98% percentile. Robust to outliers
hist_eq:bool = False, # Choose to plot using histogram equilisation
quick_imshow:bool = False, # Used to skip holoviews and use matplotlib for a static plot
**plot_kwargs, # Any other plotting options to be used in your plotting backend
) -> "bokeh or matplotlib plot":
"""Generate an RGB image from chosen RGB wavelengths with histogram equalisation or percentile options.
The plotting backend can be specified by `plot_lib` and can be "bokeh" or "matplotlib".
Further customise your plot with `**plot_kwargs`. `quick_imshow` is used for saving figures quickly
but cannot be used to make interactive plots. """
rgb = np.zeros( (*self.dc.data.shape[:2],3), dtype=np.float32)
if hasattr(self, "binned_wavelengths"):
rgb[...,0] = self.dc.data[:,:,np.argmin(np.abs(self.binned_wavelengths-red_nm))]
rgb[...,1] = self.dc.data[:,:,np.argmin(np.abs(self.binned_wavelengths-green_nm))]
rgb[...,2] = self.dc.data[:,:,np.argmin(np.abs(self.binned_wavelengths-blue_nm))]
else:
rgb[...,0] = self.dc.data[:,:,int(self.dc.data.shape[2] / 2)]
rgb[...,1] = self.dc.data[:,:,int(self.dc.data.shape[2] / 2)]
rgb[...,2] = self.dc.data[:,:,int(self.dc.data.shape[2] / 2)]
if robust and not hist_eq: # scale everything to the 2% and 98% percentile
vmax = np.nanpercentile(rgb, 98)
vmin = np.nanpercentile(rgb, 2)
rgb = ((rgb.astype("f8") - vmin) / (vmax - vmin)).astype("f4")
rgb = np.minimum(np.maximum(rgb, 0), 1)
elif hist_eq and not robust:
img_hist, bins = np.histogram(rgb.flatten(), 256, density=True)
cdf = img_hist.cumsum() # cumulative distribution function
cdf = 1. * cdf / cdf[-1] # normalize
img_eq = np.interp(rgb.flatten(), bins[:-1], cdf) # find new pixel values from linear interpolation of cdf
rgb = img_eq.reshape(rgb.shape)
elif robust and hist_eq:
warnings.warn("Cannot mix robust with histogram equalisation. No RGB adjustments will be made.",stacklevel=2)
rgb /= np.max(rgb)
else:
rgb /= np.max(rgb)
if quick_imshow:
fig, ax = plt.subplots(figsize=(12,3))
ax.imshow(rgb,aspect="equal"); ax.set_xlabel("along-track"); ax.set_ylabel("cross-track")
return fig
import holoviews as hv
hv.extension(plot_lib,logo=False)
rgb_hv = hv.RGB((np.arange(rgb.shape[1]),np.arange(rgb.shape[0]),
rgb[:,:,0],rgb[:,:,1],rgb[:,:,2]))
if plot_lib == "bokeh":
return rgb_hv.opts(width=1000,height=250,frame_height=int(rgb.shape[0]//3)).opts(**plot_kwargs).opts(
xlabel="along-track",ylabel="cross-track",invert_yaxis=True)
else: # plot_lib == "matplotlib"
return rgb_hv.opts(fig_inches=22).opts(**plot_kwargs).opts(
xlabel="along-track",ylabel="cross-track",invert_yaxis=True)
n = 256
dc = DataCube(n_lines=n,processing_lvl=2,json_path="assets/cam_settings.json",pkl_path="assets/cam_calibration.pkl")
for i in range(200):
dc.put( np.random.randint(0,255,dc.settings["resolution"]) )
dc.show("bokeh")
dc.save("assets")